package com.hao.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Calendar;
import java.util.Random;

public class SourceParallelCustomTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //自定义并行数据源
        DataStreamSource<Event> customSource = env.addSource(new MyParallelCustom()).setParallelism(3);

        customSource.print();

        env.execute("Custom");

    }

    public static class MyParallelCustom implements ParallelSourceFunction<Event> {
        //声明一个标志位
        private Boolean running = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            // 随机生成数据
            Random random = new Random();
            // 定义字段选取的数据集
            String[] users = {"Mary", "Alice", "Bob", "Cary"};
            String[] urls = {"./home", "./cart", "./prod?id=100", "./prod?id=10", "./fav"};

            //循环生成数据
            while (running) {
                String user = users[random.nextInt(users.length)];
                String url = urls[random.nextInt(urls.length)];
                //获取当前系统的时间
                long timestamp = Calendar.getInstance().getTimeInMillis();
                ctx.collect(new Event(user, url, timestamp));
                //生产频率慢一点
                Thread.sleep(2000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
